import { Tabs } from "nextra/components";
import UniversalTabs from "../../../components/UniversalTabs";

# Child Workflows

While workflows in Hatchet implement a DAG, there are many cases where the structure of a workflow isn't known ahead of time. For example, you may have a batch processing workflow where the number of tasks is determined by the input - for example, running a workflow per page in a PDF. In these cases, you can use child workflows to dynamically create new workflows as needed.

Note that spawning child workflows is a **durable** operation, meaning that spawning a child workflow will persist the state of the parent workflow. This means that if the parent workflow is interrupted, it will pick up the child workflow in the exact same state when it resumes. The index of the child workflow will be used as the default key, but custom keys can be specified if the order of the child workflows can vary.

## Looping Workflows

To loop through a list and create a new workflow per element in the list, you can simply use `spawnWorkflow` in a loop. For example:

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>

```python
@hatchet.workflow("fanout:create")
class Parent:
    @hatchet.step()
    def spawn(self, context: Context):
        for i in range(10):
          context.spawn_workflow(
            "Child", {"a": str(i)}, key=f"child{i}"
          )

        return {}

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
const parentWorkflow: Workflow = {
  id: "parent-workflow",
  description: "Example workflow for spawning child workflows",
  on: {
    event: "fanout:create",
  },
  steps: [
    {
      name: "parent-spawn",
      timeout: "10s",
      run: async (ctx) => {
        for (let i = 0; i < 10; i++) {
          ctx.spawnWorkflow<string>("child-workflow", {
            input: `child-input-${i}`,
          });
        }

        return {};
      },
    },
  ],
};
```

  </Tabs.Tab>
  <Tabs.Tab>

```go
w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name: "parent-workflow",
        On: worker.Event("fanout:create"),
        Description: "Example workflow for spawning child workflows.",
        Steps: []*worker.WorkflowStep{
            worker.Fn(func(ctx worker.HatchetContext) error {
                for i := 0; i < 10; i++ {
                  childInput := "child-input-" + strconv.Itoa(i)
                  childWorkflow, err := ctx.SpawnWorkflow("child-workflow", childInput, &worker.SpawnWorkflowOpts{})
                  if err != nil {
                      return nil, err
                  }
                }
                return nil
            }),
        },
    },
)
```

  </Tabs.Tab>
</UniversalTabs>

Note that calling `spawnWorkflow` will return immediately, and the child workflows will run in parallel. If you want to wait for child workflows, you can await the results of each workflow (see below).

## Spawning Workflows in Bulk

If you have a large number of workflows to spawn, you can use the `spawn workflows` method to spawn multiple workflows in parallel. This method will return a list of `Workflow Reference` objects that you can use to wait for the results of each workflow.
The advantage of doing this over spawning workflows in a sequentially in a loop is that the workflows will be spawned in parallel with one request to Hatchet instead of one request per workflow.

There is a maximum limit of 1000 workflows that can be spawned in a single request.

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>

```python
@hatchet.workflow("fanout:create")
class Parent:
    @hatchet.step()
    def spawn(self, context: Context):
        workflow_requests = [
            {
                "workflow": "Child",
                "input": {"a": str(i)},
                "key": f"child{i}",
                "options": {}
            }
            for i in range(10)
        ]

        # Spawn the workflows in bulk using spawn_workflows
        context.spawn_workflows(workflow_requests)
        return {}

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
const parentWorkflow: Workflow = {
  id: "parent-workflow",
  description: "Example workflow for spawning child workflows",
  on: {
    event: "fanout:create",
  },
  steps: [
    {
      name: "parent-spawn",
      timeout: "10s",
      run: async (ctx) => {
        const workflowRequests = Array.from({ length: 10 }, (_, i) => ({
          workflow: "child-workflow",
          input: { input: `child-input-${i}` },
          options: { additionalMetadata: { childKey: `child-${i}` } },
        }));

        const spawnedWorkflows = await ctx.spawnWorkflows<string, string>(
          workflowRequests,
        );

        return spawnedWorkflows;
      },
    },
  ],
};
```

  </Tabs.Tab>
  <Tabs.Tab>

```go
w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name: "parent-workflow",
        On: worker.Event("fanout:create"),
        Description: "Example workflow for spawning child workflows.",
        Steps: []*worker.WorkflowStep{
            worker.Fn(func(ctx worker.HatchetContext) error {
                // Prepare the batch of workflows to spawn
                childWorkflows := make([]*worker.SpawnWorkflowsOpts, 10)

                for i := 0; i < 10; i++ {
                    childInput := "child-input-" + strconv.Itoa(i)
                    childWorkflows[i] = &worker.SpawnWorkflowsOpts{
                        WorkflowName: "child-workflow",
                        Input:        childInput,
                        Key:          "child-key-" + strconv.Itoa(i),
                    }
                }

                // Spawn all workflows in bulk using SpawnWorkflows
                createdWorkflows, err := ctx.SpawnWorkflows(childWorkflows)
                if err != nil {
                    return err
                }

                return nil
            }),
        },
    },
)

```

  </Tabs.Tab>
</UniversalTabs>

## Scatter/Gather Workflows

To run all child workflows in parallel, and then wait for all of them to complete, you can use the `result` method on the returned method.

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>

```python
@hatchet.workflow()
class Parent:
    @hatchet.step(timeout="5m")
    async def spawn(self, context: Context):
        results = []

        for i in range(10):
            results.append(
                (
                    await context.aio.spawn_workflow(
                        "Child", {"a": str(i)}, key=f"child{i}"
                    )
                ).result()
            )

        result = await asyncio.gather(*results)

        return {"results": result}

@hatchet.workflow()
class Child:
    @hatchet.step()
    async def process(self, context: Context):
        a = context.workflow_input()["a"]
        return {"status": "success " + a}

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
const parentWorkflow: Workflow = {
  id: "parent-workflow",
  description: "Example workflow for spawning child workflows",
  on: {
    event: "fanout:create",
  },
  steps: [
    {
      name: "parent-spawn",
      timeout: "10s",
      run: async (ctx) => {
        const promises: Promise<string>[] = [];

        for (let i = 0; i < 5; i++) {
          promises.push(
            ctx
              .spawnWorkflow<string>("child-workflow", {
                input: `child-input-${i}`,
              })
              .result(),
          );
        }

        const results = await Promise.all(promises);

        return {
          results,
        };
      },
    },
  ],
};
```

  </Tabs.Tab>
  <Tabs.Tab>

```go
w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name: "parent-workflow",
        On: worker.Event("fanout:create"),
        Description: "Example workflow for spawning child workflows.",
        Steps: []*worker.WorkflowStep{
            worker.Fn(func(ctx worker.HatchetContext) error {
                var wg sync.WaitGroup
                results := make([]string, 10)
                resultCh := make(chan string, 10)
                for i := 0; i < 10; i++ {
                    wg.Add(1)
                    go func(i int) {
                        defer wg.Done()
                        childInput := "child-input-" + strconv.Itoa(i)
                        childWorkflow, err := ctx.SpawnWorkflow("child-workflow", childInput, &worker.SpawnWorkflowOpts{})
                        if err != nil {
                            // Handle error here
                            return
                        }
                        // Collect the result from the child workflow
                        result, err := childWorkflow.Result()
                        if err != nil {
                            // Handle error here
                            return
                        }
                        resultCh <- result
                    }(i)
                }
                go func() {
                    wg.Wait()
                    close(resultCh)
                }()

                // Collect all results
                for result := range resultCh {
                    results = append(results, result)
                }

                return nil
            }),
        },
    },

)

```

  </Tabs.Tab>
</UniversalTabs>

## Error Handling

If child workflows fail, an error will be raised to the parent, which can be caught and handled as needed.
For example, to spawn a recovery workflow if a child workflow fails:

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>

```python
@hatchet.workflow("fanout:create")
class Parent:
    @hatchet.step()
    async def spawn(self, context: Context):
        try:
            (
                await context.aio.spawn_workflow(
                  "Child", {"a": str(i)}, key=f"child{i}"
                )
            ).result()
        except Exception as e:
          # Spawn a recovery workflow
          context.spawn_workflow("recovery-workflow", { "error": str(e) });

        return {}

```

  </Tabs.Tab>

  <Tabs.Tab>

```typescript
const parentWorkflow: Workflow = {
  id: "parent-workflow",
  description: "Example workflow for spawning child workflows",
  on: {
    event: "fanout:create",
  },
  steps: [
    {
      name: "parent-spawn",
      timeout: "10s",
      run: async (ctx) => {
        try {
          const result = await ctx
            .spawnWorkflow<string>("child-workflow", { input: `child-input` })
            .result();
          return result;
        } catch (e) {
          // Spawn a recovery workflow
          ctx.spawnWorkflow<string>("recovery-workflow", { error: e });
        }

        return {};
      },
    },
  ],
};
```

  </Tabs.Tab>

  <Tabs.Tab>

```go
w.RegisterWorkflow(
    &worker.WorkflowJob{
        Name: "parent-workflow",
        On: worker.Event("fanout:create"),
        Description: "Example workflow for spawning child workflows.",
        Steps: []*worker.WorkflowStep{
            worker.Fn(func(ctx worker.HatchetContext) error {
                childInput := "child-input-" + strconv.Itoa(i)
                childWorkflow, err := ctx.SpawnWorkflow("child-workflow", childInput, &worker.SpawnWorkflowOpts{})
                if err != nil {
                    // Handle error here
                    return
                }
                // Collect the result from the child workflow
                result, err := childWorkflow.Result()
                if err != nil {
                    // Spawn a recovery workflow
                    recInput := err.Error()
                    _, err = ctx.SpawnWorkflow("recovery-workflow", recInput, &worker.SpawnWorkflowOpts{})
                    return
                }
                return nil
            }),
        },
    },
)

```

  </Tabs.Tab>
</UniversalTabs>
